跳到主要内容

Redis 事务的基本使用

基础概念类面试题

1. 什么是 Redis 事务?它与关系数据库的事务有什么区别?

答案要点:

  • Redis 事务是使用 MULTI 和 EXEC 命令包围的一组命令
  • 保证一个客户端在不被其他客户端打断的情况下执行多个命令
  • 关键区别:Redis 事务不支持回滚,即使某个命令失败,其他命令依然会执行
  • Redis 事务是原子性的批量操作,但不是 ACID 中的原子性

2. Redis 事务的执行流程是什么?请画出时序图

执行流程说明:

  1. 开始事务:客户端发送 MULTI 命令
  2. 命令入队:后续命令不立即执行,而是放入队列
  3. 执行事务:EXEC 命令触发队列中所有命令的原子性执行

3. Redis 事务有哪些相关命令?分别的作用是什么?

命令作用使用场景
MULTI标记事务开始开启事务块
EXEC执行事务中的所有命令提交事务
DISCARD取消事务放弃事务中的所有命令
WATCH监视键值对实现乐观锁
UNWATCH取消所有监视取消 WATCH 监视

高级特性类面试题

4. 什么是 WATCH 命令?它是如何实现乐观锁的?

WATCH 命令实现乐观锁的流程:

实现原理:

  • WATCH 基于 CAS(Compare And Swap)机制
  • 执行 EXEC 时,Redis 会检查被监视键的版本号
  • 如果版本号发生变化,说明有其他客户端修改了数据,事务回滚
  • 这是乐观锁的典型实现,避免了悲观锁的阻塞问题

5. 为什么 Redis 选择乐观锁而不是悲观锁?

答案要点:

  1. 性能考虑:持有锁的客户端运行越慢,等待解锁的客户端被阻塞时间越长
  2. 并发性:客户端永远不必花时间等待锁,只需要在事务失败时重试
  3. Redis 特性:Redis 是单线程模型,乐观锁更适合高并发场景
  4. 用户体验:减少客户端等待时间,提高系统响应性

6. 在高并发场景下,不使用事务会出现什么问题?请用 Go 代码演示

package main

import (
"fmt"
"sync"
"time"
"github.com/go-redis/redis/v8"
"context"
)

// 不使用事务的问题演示
func withoutTransaction() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
ctx := context.Background()

// 重置计数器
rdb.Set(ctx, "counter", 0, 0)

var wg sync.WaitGroup

// 启动3个goroutine同时操作
for i := 0; i < 3; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

// 读取当前值
val1, _ := rdb.Get(ctx, "counter").Result()
fmt.Printf("Goroutine %d: 初始值 %s\n", id, val1)

// 自增
rdb.Incr(ctx, "counter")

// 模拟业务处理时间
time.Sleep(time.Millisecond)

// 读取自增后的值
val2, _ := rdb.Get(ctx, "counter").Result()
fmt.Printf("Goroutine %d: 自增后值 %s\n", id, val2)

// 自减
rdb.Decr(ctx, "counter")

// 读取最终值
val3, _ := rdb.Get(ctx, "counter").Result()
fmt.Printf("Goroutine %d: 最终值 %s\n", id, val3)
}(i)
}

wg.Wait()
}

问题分析:

  • 虽然最终结果可能是正确的,但中间状态可能被其他操作读取到
  • 在高并发下,可能出现数据不一致的临时状态

性能优化类面试题

7. 什么是 Redis Pipeline?它与事务有什么关系?

Pipeline 的执行流程对比:

普通命令执行:

Pipeline 执行:

Go 代码示例:

// 使用 Pipeline 优化性能
func usePipeline() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
ctx := context.Background()

pipe := rdb.Pipeline()

// 批量添加命令
for i := 0; i < 1000; i++ {
pipe.Set(ctx, fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i), 0)
}

// 执行所有命令
_, err := pipe.Exec(ctx)
if err != nil {
fmt.Printf("Pipeline error: %v\n", err)
}
}

// Pipeline 与事务结合
func pipelineWithTransaction() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
ctx := context.Background()

pipe := rdb.TxPipeline() // 事务型 Pipeline

pipe.Set(ctx, "key1", "value1", 0)
pipe.Incr(ctx, "counter")
pipe.Get(ctx, "key1")

results, err := pipe.Exec(ctx)
if err != nil {
fmt.Printf("Transaction Pipeline error: %v\n", err)
}

for _, result := range results {
fmt.Printf("Result: %v\n", result)
}
}

8. Pipeline 的局限性有哪些?在什么场景下不适用?

局限性:

  1. 内存限制:命令队列存储在内存中,不宜一次性发送过多命令
  2. 无法中途查询:无法在 Pipeline 执行过程中获取中间结果
  3. 错误处理:无法根据某个命令的结果决定后续命令的执行

不适用场景:

// 错误示例:依赖中间结果的操作
func wrongPipelineUsage() {
pipe := rdb.Pipeline()

// 这种场景不适合用 Pipeline
result := pipe.Get(ctx, "user_balance")
// 无法在这里获取 result 的值来判断余额是否充足
pipe.DecrBy(ctx, "user_balance", 100) // 可能导致余额为负

pipe.Exec(ctx)
}

// 正确做法:先查询再决定
func correctApproach() {
balance, err := rdb.Get(ctx, "user_balance").Int()
if err != nil || balance < 100 {
return fmt.Errorf("余额不足")
}

// 使用 WATCH 实现乐观锁
return rdb.Watch(ctx, func(tx *redis.Tx) error {
pipe := tx.TxPipeline()
pipe.DecrBy(ctx, "user_balance", 100)
_, err := pipe.Exec(ctx)
return err
}, "user_balance")
}

实战应用类面试题

9. 如何使用 Redis 事务实现分布式锁?

func acquireLockWithTransaction(lockKey, identifier string, timeout time.Duration) bool {
ctx := context.Background()
end := time.Now().Add(timeout)

for time.Now().Before(end) {
err := rdb.Watch(ctx, func(tx *redis.Tx) error {
// 检查锁是否存在
if tx.Exists(ctx, lockKey).Val() == 0 {
// 锁不存在,尝试获取
pipe := tx.TxPipeline()
pipe.Set(ctx, lockKey, identifier, time.Second*10)
_, err := pipe.Exec(ctx)
return err
}
return fmt.Errorf("lock exists")
}, lockKey)

if err == nil {
return true // 获取锁成功
}

time.Sleep(time.Millisecond)
}

return false // 获取锁失败
}

10. 在电商系统中,如何使用 Redis 事务处理库存扣减?

type InventoryService struct {
rdb *redis.Client
}

func (s *InventoryService) DeductInventory(productID string, quantity int) error {
ctx := context.Background()
inventoryKey := fmt.Sprintf("inventory:%s", productID)

return s.rdb.Watch(ctx, func(tx *redis.Tx) error {
// 检查当前库存
currentStock, err := tx.Get(ctx, inventoryKey).Int()
if err != nil {
return fmt.Errorf("获取库存失败: %v", err)
}

if currentStock < quantity {
return fmt.Errorf("库存不足,当前库存: %d, 需要: %d", currentStock, quantity)
}

// 使用事务扣减库存
pipe := tx.TxPipeline()
pipe.DecrBy(ctx, inventoryKey, int64(quantity))

// 记录扣减日志
logKey := fmt.Sprintf("inventory_log:%s:%d", productID, time.Now().Unix())
pipe.HMSet(ctx, logKey, map[string]interface{}{
"product_id": productID,
"quantity": quantity,
"timestamp": time.Now().Unix(),
"action": "deduct",
})

_, err = pipe.Exec(ctx)
return err
}, inventoryKey)
}

11. Redis 事务在什么情况下会失败?如何处理失败场景?

事务失败的情况:

Go 代码处理示例:

func handleTransactionFailures() {
ctx := context.Background()
maxRetries := 3

for i := 0; i < maxRetries; i++ {
err := rdb.Watch(ctx, func(tx *redis.Tx) error {
pipe := tx.TxPipeline()

// 添加多个命令
cmd1 := pipe.Set(ctx, "key1", "value1", 0)
cmd2 := pipe.Incr(ctx, "counter")
cmd3 := pipe.LPush(ctx, "list", "item")

results, err := pipe.Exec(ctx)
if err != nil {
return err
}

// 检查每个命令的执行结果
if cmd1.Err() != nil {
return fmt.Errorf("set 命令失败: %v", cmd1.Err())
}
if cmd2.Err() != nil {
return fmt.Errorf("incr 命令失败: %v", cmd2.Err())
}
if cmd3.Err() != nil {
return fmt.Errorf("lpush 命令失败: %v", cmd3.Err())
}

fmt.Printf("事务执行成功,结果数量: %d\n", len(results))
return nil
})

if err == nil {
break // 成功执行
}

if i == maxRetries-1 {
fmt.Printf("事务执行失败,已重试 %d 次: %v\n", maxRetries, err)
} else {
fmt.Printf("事务执行失败,第 %d 次重试: %v\n", i+1, err)
time.Sleep(time.Millisecond * 10) // 短暂延迟后重试
}
}
}

面试官可能的追问

12. Redis 事务能保证 ACID 特性吗?

答案要点:

  • 原子性(Atomicity):部分支持,要么全部入队,要么全部执行,但单个命令失败不会回滚其他命令
  • 一致性(Consistency):支持,Redis 保证数据类型的正确性
  • 隔离性(Isolation):支持,事务执行期间不会被其他客户端命令打断
  • 持久性(Durability):依赖 Redis 的持久化配置(RDB/AOF)

13. 在微服务架构中,Redis 事务有什么最佳实践?

最佳实践:

  1. 避免长事务:减少事务中的命令数量,避免阻塞
  2. 合理使用 WATCH:只监视必要的键,减少冲突概率
  3. 实现重试机制:处理乐观锁冲突的重试逻辑
  4. 错误处理:检查每个命令的执行结果
  5. 性能监控:监控事务成功率和重试次数